package java.topK;

import java.net.URI;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyTopKNumJob extends Configured implements Tool {

    /**
     * @author Edison Chou
     * @version 1.0
     */
    public static class MyMapper extends
            Mapper<LongWritable, Text, NullWritable, LongWritable> {
        public static final int K = 100;
        private TreeMap<Long, Long> tm = new TreeMap<Long, Long>();

        protected void map(
                LongWritable key,
                Text value,
                Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            try {
                long temp = Long.parseLong(value.toString().trim());
                tm.put(temp, temp);
                if (tm.size() > K) {
                    //tm.remove(tm.firstKey());
                    // 如果是求topk个最小的那么使用下面的语句
                    tm.remove(tm.lastKey());
                }
            } catch (Exception e) {
                context.getCounter("TopK", "errorLog").increment(1L);
            }
        };

        protected void cleanup(
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            for (Long num : tm.values()) {
                context.write(NullWritable.get(), new LongWritable(num));
            }
        };
    }

    /**
     * @author Edison Chou
     * @version 1.0
     */
    public static class MyReducer extends
            Reducer<NullWritable, LongWritable, NullWritable, LongWritable> {
        public static final int K = 100;
        private TreeMap<Long, Long> tm = new TreeMap<Long, Long>();

        protected void reduce(
                NullWritable key,
                java.lang.Iterable<LongWritable> values,
                Reducer<NullWritable, LongWritable, NullWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            for (LongWritable num : values) {
                tm.put(num.get(), num.get());
                if (tm.size() > K) {
                    //tm.remove(tm.firstKey());
                    // 如果是求topk个最小的那么使用下面的语句
                    tm.remove(tm.lastKey());
                }
            }
            // 按降序即从大到小排列Key集合
            for (Long value : tm.descendingKeySet()) {
                context.write(NullWritable.get(), new LongWritable(value));
            }
        };
    }

    // 输入文件路径
    public static String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/seq100w.txt";
    // 输出文件路径
    public static String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/topkapp";

    @Override
    public int run(String[] args) throws Exception {
        // 首先删除输出路径的已有生成文件
        FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
        Path outPath = new Path(OUTPUT_PATH);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }

        Job job = new Job(getConf(), "TopKNumberJob");
        // 设置输入目录
        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        // 设置自定义Mapper
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 设置自定义Reducer
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(LongWritable.class);
        // 设置输出目录
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        // map端输出启用压缩
        conf.setBoolean("mapred.compress.map.output", true);
        // reduce端输出启用压缩
        conf.setBoolean("mapred.output.compress", true);
        // reduce端输出压缩使用的类
        conf.setClass("mapred.output.compression.codec", GzipCodec.class,
                CompressionCodec.class);

        try {
            int res = ToolRunner.run(conf, new MyTopKNumJob(), args);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}